1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package storm.starter.bolt;
19
20 import backtype.storm.Config;
21 import backtype.storm.topology.BasicOutputCollector;
22 import backtype.storm.topology.OutputFieldsDeclarer;
23 import backtype.storm.tuple.Fields;
24 import backtype.storm.tuple.Tuple;
25 import backtype.storm.tuple.Values;
26 import com.google.common.collect.Lists;
27 import org.testng.annotations.DataProvider;
28 import org.testng.annotations.Test;
29 import storm.starter.tools.MockTupleHelpers;
30
31 import java.util.Map;
32
33 import static org.fest.assertions.api.Assertions.assertThat;
34 import static org.mockito.Matchers.any;
35 import static org.mockito.Mockito.*;
36
37 public class IntermediateRankingsBoltTest {
38
39 private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
40 private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
41 private static final Object ANY_OBJECT = new Object();
42 private static final int ANY_TOPN = 10;
43 private static final long ANY_COUNT = 42;
44
45 private Tuple mockRankableTuple(Object obj, long count) {
46 Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
47 when(tuple.getValues()).thenReturn(Lists.newArrayList(ANY_OBJECT, ANY_COUNT));
48 return tuple;
49 }
50
51 @DataProvider
52 public Object[][] illegalTopN() {
53 return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
54 }
55
56 @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
57 public void negativeOrZeroTopNShouldThrowIAE(int topN) {
58 new IntermediateRankingsBolt(topN);
59 }
60
61 @DataProvider
62 public Object[][] illegalEmitFrequency() {
63 return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
64 }
65
66 @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
67 public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
68 new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
69 }
70
71 @DataProvider
72 public Object[][] legalTopN() {
73 return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
74 }
75
76 @Test(dataProvider = "legalTopN")
77 public void positiveTopNShouldBeOk(int topN) {
78 new IntermediateRankingsBolt(topN);
79 }
80
81 @DataProvider
82 public Object[][] legalEmitFrequency() {
83 return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
84 }
85
86 @Test(dataProvider = "legalEmitFrequency")
87 public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
88 new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
89 }
90
91 @Test
92 public void shouldEmitSomethingIfTickTupleIsReceived() {
93
94 Tuple tickTuple = MockTupleHelpers.mockTickTuple();
95 BasicOutputCollector collector = mock(BasicOutputCollector.class);
96 IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
97
98
99 bolt.execute(tickTuple, collector);
100
101
102
103 verify(collector).emit(any(Values.class));
104 }
105
106 @Test
107 public void shouldEmitNothingIfNormalTupleIsReceived() {
108
109 Tuple normalTuple = mockRankableTuple(ANY_OBJECT, ANY_COUNT);
110 BasicOutputCollector collector = mock(BasicOutputCollector.class);
111 IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
112
113
114 bolt.execute(normalTuple, collector);
115
116
117 verifyZeroInteractions(collector);
118 }
119
120 @Test
121 public void shouldDeclareOutputFields() {
122
123 OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
124 IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
125
126
127 bolt.declareOutputFields(declarer);
128
129
130 verify(declarer, times(1)).declare(any(Fields.class));
131 }
132
133 @Test
134 public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
135
136 IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
137
138
139 Map<String, Object> componentConfig = bolt.getComponentConfiguration();
140
141
142 assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
143 Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
144 assertThat(emitFrequencyInSeconds).isGreaterThan(0);
145 }
146 }